Skip to content

Unify tests on all supported platforms to ensure consistent behavior and add more tests. #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,33 +79,39 @@ public struct Configuration: Sendable {

let execution = _spawnResult.execution

let result: Swift.Result<Result, Error>
do {
result = try await .success(withAsyncTaskCleanupHandler {
let inputIO = _spawnResult.inputWriteEnd()
let outputIO = _spawnResult.outputReadEnd()
let errorIO = _spawnResult.errorReadEnd()
return try await withAsyncTaskCleanupHandler {
let inputIO = _spawnResult.inputWriteEnd()
let outputIO = _spawnResult.outputReadEnd()
let errorIO = _spawnResult.errorReadEnd()

let result: Swift.Result<Result, Error>
do {
// Body runs in the same isolation
return try await body(_spawnResult.execution, inputIO, outputIO, errorIO)
} onCleanup: {
// Attempt to terminate the child process
await execution.runTeardownSequence(
self.platformOptions.teardownSequence
)
})
} catch {
result = .failure(error)
}
let bodyResult = try await body(_spawnResult.execution, inputIO, outputIO, errorIO)
result = .success(bodyResult)
} catch {
result = .failure(error)
}

// Ensure that we begin monitoring process termination after `body` runs
// and regardless of whether `body` throws, so that the pid gets reaped
// even if `body` throws, and we are not leaving zombie processes in the
// process table which will cause the process termination monitoring thread
// to effectively hang due to the pid never being awaited
let terminationStatus = try await Subprocess.monitorProcessTermination(forExecution: _spawnResult.execution)
// Ensure that we begin monitoring process termination after `body` runs
// and regardless of whether `body` throws, so that the pid gets reaped
// even if `body` throws, and we are not leaving zombie processes in the
// process table which will cause the process termination monitoring thread
// to effectively hang due to the pid never being awaited
let terminationStatus = try await monitorProcessTermination(
forExecution: _spawnResult.execution
)

return try ExecutionResult(terminationStatus: terminationStatus, value: result.get())
return ExecutionResult(
terminationStatus: terminationStatus,
value: try result.get()
)
} onCleanup: {
// Attempt to terminate the child process
await execution.runTeardownSequence(
self.platformOptions.teardownSequence
)
}
}
}

Expand Down Expand Up @@ -329,12 +335,12 @@ public struct Arguments: Sendable, ExpressibleByArrayLiteral, Hashable {
self.executablePathOverride = nil
}
}
#endif

public init(_ array: [[UInt8]]) {
self.storage = array.map { .rawBytes($0) }
self.executablePathOverride = nil
}
#endif
}

extension Arguments: CustomStringConvertible, CustomDebugStringConvertible {
Expand Down Expand Up @@ -864,7 +870,7 @@ internal struct CreatedPipe: ~Copyable {
DWORD(readBufferSize),
DWORD(readBufferSize),
0,
&saAttributes
nil
)
}
guard let parentEnd, parentEnd != INVALID_HANDLE_VALUE else {
Expand Down
4 changes: 3 additions & 1 deletion Sources/Subprocess/IO/AsyncIO+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ internal import Dispatch
final class AsyncIO: Sendable {
static let shared: AsyncIO = AsyncIO()

private init() {}
internal init() {}

internal func shutdown() { /* noop on Darwin */ }

internal func read(
from diskIO: borrowing IOChannel,
Expand Down
23 changes: 19 additions & 4 deletions Sources/Subprocess/IO/AsyncIO+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ final class AsyncIO: Sendable {
static let shared: AsyncIO = AsyncIO()

private let state: Result<State, SubprocessError>
private let shutdownFlag: Atomic<UInt8> = Atomic(0)

private init() {
internal init() {
// Create main epoll fd
let epollFileDescriptor = epoll_create1(CInt(EPOLL_CLOEXEC))
guard epollFileDescriptor >= 0 else {
Expand Down Expand Up @@ -204,11 +205,15 @@ final class AsyncIO: Sendable {
}
}

private func shutdown() {
internal func shutdown() {
guard case .success(let currentState) = self.state else {
return
}

guard self.shutdownFlag.add(1, ordering: .sequentiallyConsistent).newValue == 1 else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be >=1 instead of ==1? I think as written this will run the shutdown procedure n-1 times for n calls.

// We already closed this AsyncIO
return
}
var one: UInt64 = 1
// Wake up the thread for shutdown
_ = _SubprocessCShims.write(currentState.shutdownFileDescriptor, &one, MemoryLayout<UInt64>.stride)
Expand All @@ -226,7 +231,6 @@ final class AsyncIO: Sendable {
}
}


private func registerFileDescriptor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should registerFileDescriptor precondition that the fd is not already in the _registration map?

_ fileDescriptor: FileDescriptor,
for event: Event
Expand Down Expand Up @@ -277,11 +281,12 @@ final class AsyncIO: Sendable {
&event
)
if rc != 0 {
let capturedError = errno
let error = SubprocessError(
code: .init(.asyncIOFailed(
"failed to add \(fileDescriptor.rawValue) to epoll list")
),
underlyingError: .init(rawValue: errno)
underlyingError: .init(rawValue: capturedError)
)
continuation.finish(throwing: error)
return
Expand Down Expand Up @@ -344,6 +349,9 @@ extension AsyncIO {
from fileDescriptor: FileDescriptor,
upTo maxLength: Int
) async throws -> [UInt8]? {
guard maxLength > 0 else {
return nil
}
// If we are reading until EOF, start with readBufferSize
// and gradually increase buffer size
let bufferLength = maxLength == .max ? readBufferSize : maxLength
Expand Down Expand Up @@ -407,6 +415,7 @@ extension AsyncIO {
}
}
}
resultBuffer.removeLast(resultBuffer.count - readLength)
return resultBuffer
}

Expand All @@ -421,6 +430,9 @@ extension AsyncIO {
_ bytes: Bytes,
to diskIO: borrowing IOChannel
) async throws -> Int {
guard bytes.count > 0 else {
return 0
}
let fileDescriptor = diskIO.channel
let signalStream = self.registerFileDescriptor(fileDescriptor, for: .write)
var writtenLength: Int = 0
Expand Down Expand Up @@ -464,6 +476,9 @@ extension AsyncIO {
_ span: borrowing RawSpan,
to diskIO: borrowing IOChannel
) async throws -> Int {
guard span.byteCount > 0 else {
return 0
}
let fileDescriptor = diskIO.channel
let signalStream = self.registerFileDescriptor(fileDescriptor, for: .write)
var writtenLength: Int = 0
Expand Down
45 changes: 33 additions & 12 deletions Sources/Subprocess/IO/AsyncIO+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ final class AsyncIO: @unchecked Sendable {
static let shared = AsyncIO()

private let ioCompletionPort: Result<HANDLE, SubprocessError>

private let monitorThread: Result<HANDLE, SubprocessError>
private let shutdownFlag: Atomic<UInt8> = Atomic(0)

private init() {
internal init() {
var maybeSetupError: SubprocessError? = nil
// Create the the completion port
guard let port = CreateIoCompletionPort(
Expand All @@ -78,10 +78,11 @@ final class AsyncIO: @unchecked Sendable {
/// > thread management rather than CreateThread and ExitThread
let threadHandleValue = _beginthreadex(nil, 0, { args in
func reportError(_ error: SubprocessError) {
_registration.withLock { store in
for continuation in store.values {
continuation.finish(throwing: error)
}
let continuations = _registration.withLock { store in
return store.values
}
for continuation in continuations {
continuation.finish(throwing: error)
}
}

Expand Down Expand Up @@ -110,11 +111,13 @@ final class AsyncIO: @unchecked Sendable {
// in the store. Windows does not offer an API to remove a
// HANDLE from an IOCP port, therefore we leave the registration
// to signify the HANDLE has already been resisted.
_registration.withLock { store in
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
if let continuation = store[targetFileDescriptor] {
continuation.finish()
return continuation
}
return nil
}
continuation?.finish()
continue
} else {
let error = SubprocessError(
Expand Down Expand Up @@ -159,12 +162,17 @@ final class AsyncIO: @unchecked Sendable {
}
}

private func shutdown() {
// Post status to shutdown HANDLE
internal func shutdown() {
guard case .success(let ioPort) = ioCompletionPort,
case .success(let monitorThreadHandle) = monitorThread else {
return
}
// Make sure we don't shutdown the same instance twice
guard self.shutdownFlag.add(1, ordering: .relaxed).newValue == 1 else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be >=1?

// We already closed this AsyncIO
return
}
// Post status to shutdown HANDLE
PostQueuedCompletionStatus(
ioPort, // CompletionPort
0, // Number of bytes transferred.
Expand Down Expand Up @@ -245,6 +253,9 @@ final class AsyncIO: @unchecked Sendable {
from handle: HANDLE,
upTo maxLength: Int
) async throws -> [UInt8]? {
guard maxLength > 0 else {
return nil
}
// If we are reading until EOF, start with readBufferSize
// and gradually increase buffer size
let bufferLength = maxLength == .max ? readBufferSize : maxLength
Expand Down Expand Up @@ -284,8 +295,12 @@ final class AsyncIO: @unchecked Sendable {
// Make sure we only get `ERROR_IO_PENDING` or `ERROR_BROKEN_PIPE`
let lastError = GetLastError()
if lastError == ERROR_BROKEN_PIPE {
// We reached EOF
return nil
// We reached EOF. Return whatever's left
guard readLength > 0 else {
return nil
}
resultBuffer.removeLast(resultBuffer.count - readLength)
return resultBuffer
}
guard lastError == ERROR_IO_PENDING else {
let error = SubprocessError(
Expand Down Expand Up @@ -337,6 +352,9 @@ final class AsyncIO: @unchecked Sendable {
_ span: borrowing RawSpan,
to diskIO: borrowing IOChannel
) async throws -> Int {
guard span.byteCount > 0 else {
return 0
}
let handle = diskIO.channel
var signalStream = self.registerHandle(diskIO.channel).makeAsyncIterator()
var writtenLength: Int = 0
Expand Down Expand Up @@ -389,6 +407,9 @@ final class AsyncIO: @unchecked Sendable {
_ bytes: Bytes,
to diskIO: borrowing IOChannel
) async throws -> Int {
guard bytes.count > 0 else {
return 0
}
let handle = diskIO.channel
var signalStream = self.registerHandle(diskIO.channel).makeAsyncIterator()
var writtenLength: Int = 0
Expand Down
11 changes: 3 additions & 8 deletions Sources/Subprocess/IO/Input.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,15 @@ public protocol InputProtocol: Sendable, ~Copyable {
public struct NoInput: InputProtocol {
internal func createPipe() throws -> CreatedPipe {
#if os(Windows)
// On Windows, instead of binding to dev null,
// we don't set the input handle in the `STARTUPINFOW`
// to signal no input
return CreatedPipe(
readFileDescriptor: nil,
writeFileDescriptor: nil
)
let devnullFd: FileDescriptor = try .openDevNull(withAccessMode: .writeOnly)
let devnull = HANDLE(bitPattern: _get_osfhandle(devnullFd.rawValue))!
#else
let devnull: FileDescriptor = try .openDevNull(withAccessMode: .readOnly)
#endif
return CreatedPipe(
readFileDescriptor: .init(devnull, closeWhenDone: true),
writeFileDescriptor: nil
)
#endif
}

public func write(with writer: StandardInputWriter) async throws {
Expand Down
29 changes: 20 additions & 9 deletions Sources/Subprocess/IO/Output.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,17 @@ public struct DiscardedOutput: OutputProtocol {
public typealias OutputType = Void

internal func createPipe() throws -> CreatedPipe {

#if os(Windows)
// On Windows, instead of binding to dev null,
// we don't set the input handle in the `STARTUPINFOW`
// to signal no output
return CreatedPipe(
readFileDescriptor: nil,
writeFileDescriptor: nil
)
let devnullFd: FileDescriptor = try .openDevNull(withAccessMode: .writeOnly)
let devnull = HANDLE(bitPattern: _get_osfhandle(devnullFd.rawValue))!
#else
let devnull: FileDescriptor = try .openDevNull(withAccessMode: .readOnly)
let devnull: FileDescriptor = try .openDevNull(withAccessMode: .writeOnly)
#endif
return CreatedPipe(
readFileDescriptor: nil,
writeFileDescriptor: .init(devnull, closeWhenDone: true)
)
#endif
}

internal init() {}
Expand Down Expand Up @@ -289,6 +285,7 @@ extension OutputProtocol {
from diskIO: consuming IOChannel?
) async throws -> OutputType {
if OutputType.self == Void.self {
try diskIO?.safelyClose()
return () as! OutputType
}
// `diskIO` is only `nil` for any types that conform to `OutputProtocol`
Expand Down Expand Up @@ -330,6 +327,7 @@ extension OutputProtocol {
underlyingError: nil
)
}

#if canImport(Darwin)
return try self.output(from: result ?? .empty)
#else
Expand Down Expand Up @@ -400,3 +398,16 @@ extension DispatchData {
return result ?? []
}
}

extension FileDescriptor {
internal static func openDevNull(
withAccessMode mode: FileDescriptor.AccessMode
) throws -> FileDescriptor {
#if os(Windows)
let devnull: FileDescriptor = try .open("NUL", mode)
#else
let devnull: FileDescriptor = try .open("/dev/null", mode)
#endif
return devnull
}
}
Loading
Loading