Skip to content

Allow callers to run a subprocess and provide low and high water marks when using SequenceOutput to emit standard output and standard error as soon as it arrives. #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 74 additions & 10 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
@preconcurrency import SystemPackage
#endif

internal import Dispatch

#if SubprocessSpan
@available(SubprocessSpan, *)
#endif
Expand All @@ -27,38 +29,100 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
public typealias Element = SequenceOutput.Buffer

private let diskIO: TrackedPlatformDiskIO
private let bufferSize: Int
private var buffer: [UInt8]
private var currentPosition: Int
private var finished: Bool
private var streamIterator: AsyncThrowingStream<StreamStatus, Swift.Error>.AsyncIterator

internal init(diskIO: TrackedPlatformDiskIO) {
internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) {
self.diskIO = diskIO
self.bufferSize = bufferSize
self.buffer = []
self.currentPosition = 0
self.finished = false
self.streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator()
Copy link
Contributor

@iCharlesHu iCharlesHu May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncBufferSequence is a shared type across all platforms therefore we can't unconditionally refer to platform specific type dispatchIO here. We may see Windows build failure as a result

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I'll look into implementing this in such a way that platforms without dispatchIO don't break.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

public func next() async throws -> SequenceOutput.Buffer? {
let data = try await self.diskIO.readChunk(
upToLength: readBufferSize
)
if data == nil {
// We finished reading. Close the file descriptor now
public mutating func next() async throws -> SequenceOutput.Buffer? {
if let status = try await streamIterator.next() {
switch status {
case .data(let data):
return data

case .endOfStream(let data):
streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator()
return data

case .endOfFile:
try self.diskIO.safelyClose()
return nil
}
} else {
try self.diskIO.safelyClose()
return nil
}
return data
}

private enum StreamStatus {
case data(SequenceOutput.Buffer)
case endOfStream(SequenceOutput.Buffer)
case endOfFile
}

private static func createDataStream(with dispatchIO: DispatchIO, bufferSize: Int) -> AsyncThrowingStream<StreamStatus, Swift.Error> {
return AsyncThrowingStream<StreamStatus, Swift.Error> { continuation in
dispatchIO.read(
offset: 0,
length: bufferSize,
queue: .global()
) { done, data, error in
if error != 0 {
continuation.finish(throwing: SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
))
return
}

// Treat empty data and nil as the same
let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil
let status: StreamStatus

switch (buffer, done) {
case (.some(let data), false):
status = .data(SequenceOutput.Buffer(data: data))

case (.some(let data), true):
status = .endOfStream(SequenceOutput.Buffer(data: data))

case (nil, false):
return

case (nil, true):
status = .endOfFile
}

continuation.yield(status)

if done {
continuation.finish()
}
}
}
}
}

private let diskIO: TrackedPlatformDiskIO
private let bufferSize: Int

internal init(diskIO: TrackedPlatformDiskIO) {
internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) {
self.diskIO = diskIO
self.bufferSize = bufferSize
}

public func makeAsyncIterator() -> Iterator {
return Iterator(diskIO: self.diskIO)
return Iterator(diskIO: self.diskIO, bufferSize: bufferSize)
}
}

Expand Down
26 changes: 23 additions & 3 deletions Sources/Subprocess/Execution.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import Musl
import WinSDK
#endif

internal import Dispatch

/// An object that repersents a subprocess that has been
/// executed. You can use this object to send signals to the
/// child process as well as stream its output and error.
Expand Down Expand Up @@ -107,7 +109,16 @@ extension Execution where Output == SequenceOutput {
else {
fatalError("The standard output has already been consumed")
}
return AsyncBufferSequence(diskIO: readFd)

if let lowWater = output.lowWater {
readFd.dispatchIO.setLimit(lowWater: lowWater)
}

if let highWater = output.highWater {
readFd.dispatchIO.setLimit(highWater: highWater)
}

return AsyncBufferSequence(diskIO: readFd, bufferSize: output.bufferSize)
}
}

Expand All @@ -122,15 +133,24 @@ extension Execution where Error == SequenceOutput {
/// via pipe under the hood and each pipe can only be consumed once.
public var standardError: AsyncBufferSequence {
let consumptionState = self.outputConsumptionState.bitwiseXor(
OutputConsumptionState.standardOutputConsumed
OutputConsumptionState.standardErrorConsumed
)

guard consumptionState.contains(.standardErrorConsumed),
let readFd = self.errorPipe.readEnd
else {
fatalError("The standard output has already been consumed")
}
return AsyncBufferSequence(diskIO: readFd)

if let lowWater = error.lowWater {
readFd.dispatchIO.setLimit(lowWater: lowWater)
}

if let highWater = error.highWater {
readFd.dispatchIO.setLimit(highWater: highWater)
}

return AsyncBufferSequence(diskIO: readFd, bufferSize: error.bufferSize)
}
}

Expand Down
14 changes: 12 additions & 2 deletions Sources/Subprocess/IO/Output.swift
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,15 @@ public struct BytesOutput: OutputProtocol {
#endif
public struct SequenceOutput: OutputProtocol {
public typealias OutputType = Void

internal init() {}
internal let lowWater: Int?
internal let highWater: Int?
internal let bufferSize: Int

internal init(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int = readBufferSize) {
self.lowWater = lowWater
self.highWater = highWater
self.bufferSize = bufferSize
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think it’s appropriate to include these parameters here for a couple of reasons:

  • (This isn’t directly related to your change) right now, we’re in the middle of some major architectural updates: Adopt ~Copyable in Subprocess #38. This PR makes SequenceOutput internal, so you can’t use .sequence or .sequence(lowWater: …) anymore.
  • More importantly, this looks like a platform-specific feature. Setting this parameter won’t have any impact on Windows, and (also unrelated to your change) we’re planning to move away from DispatchIO on Linux soon, so it won’t work there either.

Considering all this, I suggest we move these parameters to Darwin’s specific PlatformOptions, maybe under a nested struct PlatformOptions.StreamOptions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. I'll look into moving these parameters.

}

#if SubprocessSpan
Expand Down Expand Up @@ -284,6 +291,9 @@ extension OutputProtocol where Self == SequenceOutput {
/// to the `.standardOutput` (or `.standardError`) property
/// of `Execution` as `AsyncSequence<Data>`.
public static var sequence: Self { .init() }
public static func sequence(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int? = nil) -> Self {
.init(lowWater: lowWater, highWater: highWater, bufferSize: bufferSize ?? readBufferSize)
}
}

// MARK: - Span Default Implementations
Expand Down
38 changes: 38 additions & 0 deletions Tests/SubprocessTests/SubprocessTests+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,44 @@ extension SubprocessUnixTests {
#expect(catResult.terminationStatus.isSuccess)
#expect(catResult.standardError == expected)
}

@Test func testSlowDripRedirectedOutputRedirectToSequence() async throws {
let threshold: Double = 0.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately in tests you'll have to write

guard #available(SubprocessSpan , *) else {
    return
}

In the beginning to work around the same availability issue. See other tests for examples.

Copy link
Author

@rdingman rdingman May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu When I first started working on this, I was very confused as to why some of the tests weren't running my new code and it was because of this check. Wouldn't it be better to have them skipped and noted as such in the test output rather than falsely succeeding? I'm thinking something like this:

    @Test(
        .enabled(
            if: {
                if #available(SubprocessSpan , *) {
                    true
                } else {
                    false
                }
            }(),
            "This test requires SubprocessSpan"
        )
    )
    func testSlowDripRedirectedOutputRedirectToSequence() async throws {
    }

Of course, we can have a helper function to make this less verbose.

Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu I went ahead and conditionalized this one test this way as an example. Let me know if you don't like that and would like me to revert to a guard

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdingman unfortunately this won't work because on Swift 6.2 and above the compiler will complain the code inside of testSlowDripRedirectedOutputRedirectToSequence needs SubprocessSpan and ask you to put @available around the function. That doesn't work either because the macro won't pick it up.

Unfortunately so far this is the only way I found that works. Very ugly... but it'll have to do for now.

Copy link
Author

@rdingman rdingman May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu Hmm. This builds just fine for me with the latest Swift 6.2 toolchain, has the intended results, and works with the currently shipping Swift 6.1 toolchain. However, I'll go ahead and revert this and match the existing tests.


let script = """
echo "DONE"
sleep \(threshold)
"""

let start = ContinuousClock().now

let catResult = try await Subprocess.run(
.path("/bin/bash"),
arguments: ["-c", script],
output: .sequence(lowWater: 0),
error: .discarded,
body: { (execution, _) in
for try await chunk in execution.standardOutput {
let string = chunk.withUnsafeBytes { String(decoding: $0, as: UTF8.self) }

if string.hasPrefix("DONE") {
let end = ContinuousClock().now

if (end - start) > .seconds(threshold) {
return "Failure"

} else {
return "Success"
}
}
}

return "Failure"
}
)
#expect(catResult.terminationStatus.isSuccess)
#expect(catResult.value == "Success")
}
}

// MARK: - PlatformOption Tests
Expand Down
Loading