Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Sources/Subprocess/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_library(Subprocess
Buffer.swift
Error.swift
Teardown.swift
Thread.swift
Result.swift
IO/Output.swift
IO/Input.swift
Expand Down
6 changes: 5 additions & 1 deletion Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public struct Configuration: Sendable {
isolation: isolated (any Actor)? = #isolation,
_ body: ((Execution, consuming IOChannel?, consuming IOChannel?, consuming IOChannel?) async throws -> Result)
) async throws -> ExecutionResult<Result> {
let spawnResults = try self.spawn(
let spawnResults = try await self.spawn(
withInput: input,
outputPipe: output,
errorPipe: error
Expand Down Expand Up @@ -660,7 +660,11 @@ internal struct IODescriptor: ~Copyable {
#endif

internal var closeWhenDone: Bool
#if canImport(WinSDK)
internal nonisolated(unsafe) let descriptor: Descriptor
#else
internal let descriptor: Descriptor
#endif

internal init(
_ descriptor: Descriptor,
Expand Down
37 changes: 13 additions & 24 deletions Sources/Subprocess/IO/AsyncIO+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class AsyncIO: @unchecked Sendable {
) rethrows -> ResultType
}

private final class MonitorThreadContext {
private struct MonitorThreadContext: @unchecked Sendable {
let ioCompletionPort: HANDLE

init(ioCompletionPort: HANDLE) {
Expand All @@ -61,9 +61,9 @@ final class AsyncIO: @unchecked Sendable {
var maybeSetupError: SubprocessError? = nil
// Create the the completion port
guard
let port = CreateIoCompletionPort(
let ioCompletionPort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, nil, 0, 0
), port != INVALID_HANDLE_VALUE
), ioCompletionPort != INVALID_HANDLE_VALUE
else {
let error = SubprocessError(
code: .init(.asyncIOFailed("CreateIoCompletionPort failed")),
Expand All @@ -73,17 +73,12 @@ final class AsyncIO: @unchecked Sendable {
self.monitorThread = .failure(error)
return
}
self.ioCompletionPort = .success(port)
self.ioCompletionPort = .success(ioCompletionPort)
// Create monitor thread
let threadContext = MonitorThreadContext(ioCompletionPort: port)
let threadContextPtr = Unmanaged.passRetained(threadContext)
/// Microsoft documentation for `CreateThread` states:
/// > A thread in an executable that calls the C run-time library (CRT)
/// > should use the _beginthreadex and _endthreadex functions for
/// > thread management rather than CreateThread and ExitThread
let threadHandleValue = _beginthreadex(
nil, 0,
{ args in
let context = MonitorThreadContext(ioCompletionPort: ioCompletionPort)
let threadHandle: HANDLE
do {
threadHandle = try begin_thread_x {
func reportError(_ error: SubprocessError) {
let continuations = _registration.withLock { store in
return store.values
Expand All @@ -93,9 +88,6 @@ final class AsyncIO: @unchecked Sendable {
}
}

let unmanaged = Unmanaged<MonitorThreadContext>.fromOpaque(args!)
let context = unmanaged.takeRetainedValue()

// Monitor loop
while true {
var bytesTransferred: DWORD = 0
Expand Down Expand Up @@ -149,16 +141,13 @@ final class AsyncIO: @unchecked Sendable {
}
continuation?.yield(bytesTransferred)
}

return 0
}, threadContextPtr.toOpaque(), 0, nil)
guard threadHandleValue > 0,
let threadHandle = HANDLE(bitPattern: threadHandleValue)
else {
// _beginthreadex uses errno instead of GetLastError()
let capturedError = _subprocess_windows_get_errno()
}
} catch let underlyingError {
let error = SubprocessError(
code: .init(.asyncIOFailed("_beginthreadex failed")),
underlyingError: .init(rawValue: capturedError)
code: .init(.asyncIOFailed("Failed to create monitor thread")),
underlyingError: underlyingError
)
self.monitorThread = .failure(error)
return
Expand Down
63 changes: 44 additions & 19 deletions Sources/Subprocess/Platforms/Subprocess+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,24 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible

// MARK: - Spawn
extension Configuration {
// @unchecked Sendable because we need to capture UnsafePointers
// to send to another thread. While UnsafePointers are not
// Sendable, we are not mutating them -- we only need these type
// for C interface.
internal struct SpawnContext: @unchecked Sendable {
let fileActions: posix_spawn_file_actions_t?
let spawnAttributes: posix_spawnattr_t?
let argv: [UnsafeMutablePointer<CChar>?]
let env: [UnsafeMutablePointer<CChar>?]
let uidPtr: UnsafeMutablePointer<uid_t>?
let gidPtr: UnsafeMutablePointer<gid_t>?
}

internal func spawn(
withInput inputPipe: consuming CreatedPipe,
outputPipe: consuming CreatedPipe,
errorPipe: consuming CreatedPipe
) throws -> SpawnResult {
) async throws -> SpawnResult {
// Instead of checking if every possible executable path
// is valid, spawn each directly and catch ENOENT
let possiblePaths = self.executable.possibleExecutablePaths(
Expand All @@ -167,7 +180,7 @@ extension Configuration {
var outputPipeBox: CreatedPipe? = consume outputPipe
var errorPipeBox: CreatedPipe? = consume errorPipe

return try self.preSpawn { args throws -> SpawnResult in
return try await self.preSpawn { args throws -> SpawnResult in
let (env, uidPtr, gidPtr, supplementaryGroups) = args
var _inputPipe = inputPipeBox.take()!
var _outputPipe = outputPipeBox.take()!
Expand All @@ -181,8 +194,6 @@ extension Configuration {
let errorWriteFileDescriptor: IODescriptor? = _errorPipe.writeFileDescriptor()

for possibleExecutablePath in possiblePaths {
var pid: pid_t = 0

// Setup Arguments
let argv: [UnsafeMutablePointer<CChar>?] = self.arguments.createArgs(
withExecutablePath: possibleExecutablePath
Expand Down Expand Up @@ -389,21 +400,35 @@ extension Configuration {
}

// Spawn
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
return _subprocess_spawn(
&pid,
exePath,
&fileActions,
&spawnAttributes,
argv,
env,
uidPtr,
gidPtr,
Int32(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
let spawnContext = SpawnContext(
fileActions: fileActions,
spawnAttributes: spawnAttributes,
argv: argv,
env: env,
uidPtr: uidPtr,
gidPtr: gidPtr
)
let (spawnError, pid) = try await runOnBackgroundThread {
return possibleExecutablePath.withCString { exePath in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
var pid: pid_t = 0
var _fileActions = spawnContext.fileActions
var _spawnAttributes = spawnContext.spawnAttributes
let rc = _subprocess_spawn(
&pid,
exePath,
&_fileActions,
&_spawnAttributes,
spawnContext.argv,
spawnContext.env,
spawnContext.uidPtr,
spawnContext.gidPtr,
Int32(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
return (rc, pid)
}
}
}
// Spawn error
Expand Down
6 changes: 3 additions & 3 deletions Sources/Subprocess/Platforms/Subprocess+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ private func monitorThreadFunc(context: MonitorThreadContext) {
repeating: epoll_event(events: 0, data: epoll_data(fd: 0)),
count: 256
)
var waitMask = sigset_t();
sigemptyset(&waitMask);
sigaddset(&waitMask, SIGCHLD);
var waitMask = sigset_t()
sigemptyset(&waitMask)
sigaddset(&waitMask, SIGCHLD)
// Enter the monitor loop
monitorLoop: while true {
let eventCount = epoll_pwait(
Expand Down
117 changes: 46 additions & 71 deletions Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ extension Configuration {
)

internal func preSpawn<Result: ~Copyable>(
_ work: (PreSpawnArgs) throws -> Result
) throws -> Result {
_ work: (PreSpawnArgs) async throws -> Result
) async throws -> Result {
// Prepare environment
let env = self.environment.createEnv()
defer {
Expand All @@ -378,7 +378,7 @@ extension Configuration {
if let groupsValue = self.platformOptions.supplementaryGroups {
supplementaryGroups = groupsValue
}
return try work(
return try await work(
(
env: env,
uidPtr: uidPtr,
Expand Down Expand Up @@ -415,11 +415,24 @@ internal typealias PlatformFileDescriptor = CInt

#if !canImport(Darwin)
extension Configuration {

// @unchecked Sendable because we need to capture UnsafePointers
// to send to another thread. While UnsafePointers are not
// Sendable, we are not mutating them -- we only need these type
// for C interface.
internal struct SpawnContext: @unchecked Sendable {
let argv: [UnsafeMutablePointer<CChar>?]
let env: [UnsafeMutablePointer<CChar>?]
let uidPtr: UnsafeMutablePointer<uid_t>?
let gidPtr: UnsafeMutablePointer<gid_t>?
let processGroupIDPtr: UnsafeMutablePointer<gid_t>?
}

internal func spawn(
withInput inputPipe: consuming CreatedPipe,
outputPipe: consuming CreatedPipe,
errorPipe: consuming CreatedPipe
) throws -> SpawnResult {
) async throws -> SpawnResult {
// Ensure the waiter thread is running.
#if os(Linux) || os(Android)
_setupMonitorSignalHandler()
Expand All @@ -434,7 +447,7 @@ extension Configuration {
var outputPipeBox: CreatedPipe? = consume outputPipe
var errorPipeBox: CreatedPipe? = consume errorPipe

return try self.preSpawn { args throws -> SpawnResult in
return try await self.preSpawn { args throws -> SpawnResult in
let (env, uidPtr, gidPtr, supplementaryGroups) = args

var _inputPipe = inputPipeBox.take()!
Expand Down Expand Up @@ -472,27 +485,34 @@ extension Configuration {
]

// Spawn
var pid: pid_t = 0
var processDescriptor: PlatformFileDescriptor = -1
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
return (self.workingDirectory?.string).withOptionalCString { workingDir in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
return fileDescriptors.withUnsafeBufferPointer { fds in
return _subprocess_fork_exec(
&pid,
&processDescriptor,
exePath,
workingDir,
fds.baseAddress!,
argv,
env,
uidPtr,
gidPtr,
processGroupIDPtr,
CInt(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
let spawnContext = SpawnContext(
argv: argv, env: env, uidPtr: uidPtr, gidPtr: gidPtr, processGroupIDPtr: processGroupIDPtr
)
let (pid, processDescriptor, spawnError) = try await runOnBackgroundThread {
return possibleExecutablePath.withCString { exePath in
return (self.workingDirectory?.string).withOptionalCString { workingDir in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
return fileDescriptors.withUnsafeBufferPointer { fds in
var pid: pid_t = 0
var processDescriptor: PlatformFileDescriptor = -1

let rc = _subprocess_fork_exec(
&pid,
&processDescriptor,
exePath,
workingDir,
fds.baseAddress!,
spawnContext.argv,
spawnContext.env,
spawnContext.uidPtr,
spawnContext.gidPtr,
spawnContext.processGroupIDPtr,
CInt(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
return (pid, processDescriptor, rc)
}
}
}
}
Expand Down Expand Up @@ -654,51 +674,6 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
return self.description(withIndent: 0)
}
}

// Special keys used in Error's user dictionary
extension String {
static let debugDescriptionErrorKey = "DebugDescription"
}

internal func pthread_create(_ body: @Sendable @escaping () -> ()) throws(SubprocessError.UnderlyingError) -> pthread_t {
final class Context {
let body: @Sendable () -> ()
init(body: @Sendable @escaping () -> Void) {
self.body = body
}
}
#if canImport(Glibc) || canImport(Musl)
func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? {
(Unmanaged<AnyObject>.fromOpaque(context!).takeRetainedValue() as! Context).body()
return nil
}
#elseif canImport(Bionic)
func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
(Unmanaged<AnyObject>.fromOpaque(context).takeRetainedValue() as! Context).body()
return context
}
#endif
#if (os(Linux) && canImport(Glibc)) || canImport(Bionic)
var thread = pthread_t()
#else
var thread: pthread_t?
#endif
let rc = pthread_create(
&thread,
nil,
proc,
Unmanaged.passRetained(Context(body: body)).toOpaque()
)
if rc != 0 {
throw SubprocessError.UnderlyingError(rawValue: rc)
}
#if (os(Linux) && canImport(Glibc)) || canImport(Bionic)
return thread
#else
return thread!
#endif
}

#endif // !canImport(Darwin)

extension ProcessIdentifier {
Expand Down
Loading