Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ var defaultTraits: Set<String> = ["SubprocessFoundation"]
defaultTraits.insert("SubprocessSpan")
#endif

let packageSwiftSettings: [SwiftSetting] = [
.define("SUBPROCESS_ASYNCIO_DISPATCH", .when(platforms: [.macOS, .custom("freebsd"), .openbsd]))
]

let package = Package(
name: "Subprocess",
platforms: [.macOS(.v13), .iOS("99.0")],
Expand Down Expand Up @@ -58,7 +62,7 @@ let package = Package(
.enableExperimentalFeature("NonescapableTypes"),
.enableExperimentalFeature("LifetimeDependence"),
.enableExperimentalFeature("Span"),
]
] + packageSwiftSettings
),
.testTarget(
name: "SubprocessTests",
Expand All @@ -70,7 +74,7 @@ let package = Package(
],
swiftSettings: [
.enableExperimentalFeature("Span"),
]
] + packageSwiftSettings
),

.target(
Expand Down
6 changes: 3 additions & 3 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
public typealias Failure = any Swift.Error
public typealias Element = Buffer

#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
internal typealias DiskIO = DispatchIO
#elseif canImport(WinSDK)
internal typealias DiskIO = HANDLE
Expand Down Expand Up @@ -55,7 +55,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
)
guard let data else {
// We finished reading. Close the file descriptor now
#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
try _safelyClose(.dispatchIO(self.diskIO))
#elseif canImport(WinSDK)
try _safelyClose(.handle(self.diskIO))
Expand Down Expand Up @@ -137,7 +137,7 @@ extension AsyncBufferSequence {
self.eofReached = true
return nil
}
#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
// Unfortunately here we _have to_ copy the bytes out because
// DispatchIO (rightfully) reuses buffer, which means `buffer.data`
// has the same address on all iterations, therefore we can't directly
Expand Down
10 changes: 5 additions & 5 deletions Sources/Subprocess/Buffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
extension AsyncBufferSequence {
/// A immutable collection of bytes
public struct Buffer: Sendable {
#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
// We need to keep the backingData alive while Slice is alive
internal let backingData: DispatchData
internal let data: DispatchData.Region
Expand Down Expand Up @@ -45,7 +45,7 @@ extension AsyncBufferSequence {
internal static func createFrom(_ data: [UInt8]) -> [Buffer] {
return [.init(data: data)]
}
#endif // canImport(Darwin)
#endif // SUBPROCESS_ASYNCIO_DISPATCH
}
}

Expand Down Expand Up @@ -92,7 +92,7 @@ extension AsyncBufferSequence.Buffer {

// MARK: - Hashable, Equatable
extension AsyncBufferSequence.Buffer: Equatable, Hashable {
#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
public static func == (lhs: AsyncBufferSequence.Buffer, rhs: AsyncBufferSequence.Buffer) -> Bool {
return lhs.data == rhs.data
}
Expand All @@ -104,7 +104,7 @@ extension AsyncBufferSequence.Buffer: Equatable, Hashable {
// else Compiler generated conformances
}

#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
extension DispatchData.Region {
static func == (lhs: DispatchData.Region, rhs: DispatchData.Region) -> Bool {
return lhs.withUnsafeBytes { lhsBytes in
Expand All @@ -120,7 +120,7 @@ extension DispatchData.Region {
}
}
}
#if !SubprocessFoundation
#if !canImport(Darwin) || !SubprocessFoundation
/// `DispatchData.Region` is defined in Foundation, but we can't depend on Foundation when the SubprocessFoundation trait is disabled.
extension DispatchData {
typealias Region = _ContiguousBufferView
Expand Down
7 changes: 6 additions & 1 deletion Sources/Subprocess/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ add_library(Subprocess
Result.swift
IO/Output.swift
IO/Input.swift
IO/AsyncIO+Darwin.swift
IO/AsyncIO+Dispatch.swift
IO/AsyncIO+Linux.swift
IO/AsyncIO+Windows.swift
Span+Subprocess.swift
Expand All @@ -36,8 +36,13 @@ elseif(LINUX OR ANDROID)
Platforms/Subprocess+Unix.swift)
elseif(APPLE)
target_sources(Subprocess PRIVATE
Platforms/Subprocess+BSD.swift
Platforms/Subprocess+Darwin.swift
Platforms/Subprocess+Unix.swift)
elseif(FREEBSD OR OPENBSD)
target_sources(Subprocess PRIVATE
Platforms/Subprocess+BSD.swift
Platforms/Subprocess+Unix.swift)
endif()

target_compile_options(Subprocess PRIVATE
Expand Down
14 changes: 7 additions & 7 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ internal struct IODescriptor: ~Copyable {
consuming func createIOChannel() -> IOChannel {
let shouldClose = self.closeWhenDone
self.closeWhenDone = false
#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
// Transferring out the ownership of fileDescriptor means we don't have go close here
let closeFd = self.descriptor
let dispatchIO: DispatchIO = DispatchIO(
Expand Down Expand Up @@ -708,10 +708,10 @@ internal struct IODescriptor: ~Copyable {
}

internal struct IOChannel: ~Copyable, @unchecked Sendable {
#if canImport(WinSDK)
typealias Channel = HANDLE
#elseif canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
typealias Channel = DispatchIO
#elseif canImport(WinSDK)
typealias Channel = HANDLE
#else
typealias Channel = FileDescriptor
#endif
Expand All @@ -733,10 +733,10 @@ internal struct IOChannel: ~Copyable, @unchecked Sendable {
}
closeWhenDone = false

#if canImport(WinSDK)
try _safelyClose(.handle(self.channel))
#elseif canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
try _safelyClose(.dispatchIO(self.channel))
#elseif canImport(WinSDK)
try _safelyClose(.handle(self.channel))
#else
try _safelyClose(.fileDescriptor(self.channel))
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
/// Darwin AsyncIO implementation based on DispatchIO

// MARK: - macOS (DispatchIO)
#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH

#if canImport(System)
@preconcurrency import System
Expand Down Expand Up @@ -166,4 +166,8 @@ final class AsyncIO: Sendable {
}
}

#if !canImport(Darwin)
extension DispatchData: @retroactive @unchecked Sendable { }
#endif

#endif
15 changes: 10 additions & 5 deletions Sources/Subprocess/IO/AsyncIO+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

/// Linux AsyncIO implementation based on epoll

#if canImport(Glibc) || canImport(Android) || canImport(Musl)
#if os(Linux) || os(Android)

#if canImport(System)
@preconcurrency import System
Expand Down Expand Up @@ -266,6 +266,11 @@ final class AsyncIO: Sendable {
targetEvent = EPOLL_EVENTS(EPOLLOUT)
}

// Save the continuation (before calling epoll_ctl, so we don't miss any data)
_registration.withLock { storage in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate a bit more on why this change is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's two threads:

  • thread A: thread that calls registerFileDescriptor
  • thread B: monitor loop

With the original code, the following ordering could occur:

  • (thread A) add file descriptor N to epoll
  • (thread B) process event for N
  • (thread B) look up continuation <-- continuation is nil!
  • (thread A) save continuation <-- too late

This change forces the ordering to become:

  • (thread A) save continuation
  • (thread A) add file descriptor N to epoll
  • (thread B) process event for N
  • (thread B) look up continuation <-- continuation cannot be nil

storage[fileDescriptor.rawValue] = continuation
}

var event = epoll_event(
events: targetEvent.rawValue,
data: epoll_data(fd: fileDescriptor.rawValue)
Expand All @@ -277,6 +282,10 @@ final class AsyncIO: Sendable {
&event
)
if rc != 0 {
_registration.withLock { storage in
storage.removeValue(forKey: fileDescriptor.rawValue)
}

let capturedError = errno
let error = SubprocessError(
code: .init(.asyncIOFailed(
Expand All @@ -287,10 +296,6 @@ final class AsyncIO: Sendable {
continuation.finish(throwing: error)
return
}
// Now save the continuation
_registration.withLock { storage in
storage[fileDescriptor.rawValue] = continuation
}
case .failure(let setupError):
continuation.finish(throwing: setupError)
return
Expand Down
12 changes: 6 additions & 6 deletions Sources/Subprocess/IO/Output.swift
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public struct BytesOutput: OutputProtocol {
internal func captureOutput(
from diskIO: consuming IOChannel
) async throws -> [UInt8] {
#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
var result: DispatchData? = nil
#else
var result: [UInt8]? = nil
Expand All @@ -173,7 +173,7 @@ public struct BytesOutput: OutputProtocol {
underlyingError: nil
)
}
#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
return result?.array() ?? []
#else
return result ?? []
Expand Down Expand Up @@ -302,7 +302,7 @@ extension OutputProtocol {
return try await bytesOutput.captureOutput(from: diskIO) as! Self.OutputType
}

#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
var result: DispatchData? = nil
#else
var result: [UInt8]? = nil
Expand All @@ -328,7 +328,7 @@ extension OutputProtocol {
)
}

#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
return try self.output(from: result ?? .empty)
#else
return try self.output(from: result ?? [])
Expand All @@ -353,7 +353,7 @@ extension OutputProtocol where OutputType == Void {

#if SubprocessSpan
extension OutputProtocol {
#if canImport(Darwin)
#if SUBPROCESS_ASYNCIO_DISPATCH
internal func output(from data: DispatchData) throws -> OutputType {
guard !data.isEmpty else {
let empty = UnsafeRawBufferPointer(start: nil, count: 0)
Expand All @@ -380,7 +380,7 @@ extension OutputProtocol {
return try self.output(from: span)
}
}
#endif // canImport(Darwin)
#endif // SUBPROCESS_ASYNCIO_DISPATCH
}
#endif

Expand Down
62 changes: 62 additions & 0 deletions Sources/Subprocess/Platforms/Subprocess+BSD.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2025 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if os(macOS) || os(FreeBSD) || os(OpenBSD)

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#endif

internal import Dispatch

// MARK: - Process Monitoring
@Sendable
internal func monitorProcessTermination(
for processIdentifier: ProcessIdentifier
) async throws -> TerminationStatus {
switch Result(catching: { () throws(SubprocessError.UnderlyingError) -> TerminationStatus? in try processIdentifier.reap() }) {
case let .success(status?):
return status
case .success(nil):
break
case let .failure(error):
throw SubprocessError(
code: .init(.failedToMonitorProcess),
underlyingError: error
)
}
return try await withCheckedThrowingContinuation { continuation in
let source = DispatchSource.makeProcessSource(
identifier: processIdentifier.value,
eventMask: [.exit],
queue: .global()
)
source.setEventHandler {
source.cancel()
continuation.resume(with: Result(catching: { () throws(SubprocessError.UnderlyingError) -> TerminationStatus in
// NOTE_EXIT may be delivered slightly before the process becomes reapable,
// so we must call waitid without WNOHANG to avoid a narrow possibility of a race condition.
// If waitid does block, it won't do so for very long at all.
try processIdentifier.blockingReap()
}).mapError { underlyingError in
SubprocessError(
code: .init(.failedToMonitorProcess),
underlyingError: underlyingError
)
})
}
source.resume()
}
}

#endif
42 changes: 0 additions & 42 deletions Sources/Subprocess/Platforms/Subprocess+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -497,46 +497,4 @@ extension ProcessIdentifier: CustomStringConvertible, CustomDebugStringConvertib
public var debugDescription: String { "\(self.value)" }
}

// MARK: - Process Monitoring
@Sendable
internal func monitorProcessTermination(
for processIdentifier: ProcessIdentifier
) async throws -> TerminationStatus {
return try await withCheckedThrowingContinuation { continuation in
let source = DispatchSource.makeProcessSource(
identifier: processIdentifier.value,
eventMask: [.exit],
queue: .global()
)
source.setEventHandler {
source.cancel()
var siginfo = siginfo_t()
let rc = waitid(P_PID, id_t(processIdentifier.value), &siginfo, WEXITED)
guard rc == 0 else {
continuation.resume(
throwing: SubprocessError(
code: .init(.failedToMonitorProcess),
underlyingError: .init(rawValue: errno)
)
)
return
}
switch siginfo.si_code {
case .init(CLD_EXITED):
continuation.resume(returning: .exited(siginfo.si_status))
return
case .init(CLD_KILLED), .init(CLD_DUMPED):
continuation.resume(returning: .unhandledException(siginfo.si_status))
case .init(CLD_TRAPPED), .init(CLD_STOPPED), .init(CLD_CONTINUED), .init(CLD_NOOP):
// Ignore these signals because they are not related to
// process exiting
break
default:
fatalError("Unexpected exit status: \(siginfo.si_code)")
}
}
source.resume()
}
}

#endif // canImport(Darwin)
Loading