Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Sources/Subprocess/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_library(Subprocess
Buffer.swift
Error.swift
Teardown.swift
Thread.swift
Result.swift
IO/Output.swift
IO/Input.swift
Expand Down
6 changes: 5 additions & 1 deletion Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public struct Configuration: Sendable {
isolation: isolated (any Actor)? = #isolation,
_ body: ((Execution, consuming IOChannel?, consuming IOChannel?, consuming IOChannel?) async throws -> Result)
) async throws -> ExecutionResult<Result> {
let spawnResults = try self.spawn(
let spawnResults = try await self.spawn(
withInput: input,
outputPipe: output,
errorPipe: error
Expand Down Expand Up @@ -660,7 +660,11 @@ internal struct IODescriptor: ~Copyable {
#endif

internal var closeWhenDone: Bool
#if canImport(WinSDK)
internal nonisolated(unsafe) let descriptor: Descriptor
#else
internal let descriptor: Descriptor
#endif

internal init(
_ descriptor: Descriptor,
Expand Down
37 changes: 13 additions & 24 deletions Sources/Subprocess/IO/AsyncIO+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class AsyncIO: @unchecked Sendable {
) rethrows -> ResultType
}

private final class MonitorThreadContext {
private struct MonitorThreadContext: @unchecked Sendable {
let ioCompletionPort: HANDLE

init(ioCompletionPort: HANDLE) {
Expand All @@ -61,9 +61,9 @@ final class AsyncIO: @unchecked Sendable {
var maybeSetupError: SubprocessError? = nil
// Create the the completion port
guard
let port = CreateIoCompletionPort(
let ioCompletionPort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, nil, 0, 0
), port != INVALID_HANDLE_VALUE
), ioCompletionPort != INVALID_HANDLE_VALUE
else {
let error = SubprocessError(
code: .init(.asyncIOFailed("CreateIoCompletionPort failed")),
Expand All @@ -73,17 +73,12 @@ final class AsyncIO: @unchecked Sendable {
self.monitorThread = .failure(error)
return
}
self.ioCompletionPort = .success(port)
self.ioCompletionPort = .success(ioCompletionPort)
// Create monitor thread
let threadContext = MonitorThreadContext(ioCompletionPort: port)
let threadContextPtr = Unmanaged.passRetained(threadContext)
/// Microsoft documentation for `CreateThread` states:
/// > A thread in an executable that calls the C run-time library (CRT)
/// > should use the _beginthreadex and _endthreadex functions for
/// > thread management rather than CreateThread and ExitThread
let threadHandleValue = _beginthreadex(
nil, 0,
{ args in
let context = MonitorThreadContext(ioCompletionPort: ioCompletionPort)
let threadHandle: HANDLE
do {
threadHandle = try begin_thread_x {
func reportError(_ error: SubprocessError) {
let continuations = _registration.withLock { store in
return store.values
Expand All @@ -93,9 +88,6 @@ final class AsyncIO: @unchecked Sendable {
}
}

let unmanaged = Unmanaged<MonitorThreadContext>.fromOpaque(args!)
let context = unmanaged.takeRetainedValue()

// Monitor loop
while true {
var bytesTransferred: DWORD = 0
Expand Down Expand Up @@ -149,16 +141,13 @@ final class AsyncIO: @unchecked Sendable {
}
continuation?.yield(bytesTransferred)
}

return 0
}, threadContextPtr.toOpaque(), 0, nil)
guard threadHandleValue > 0,
let threadHandle = HANDLE(bitPattern: threadHandleValue)
else {
// _beginthreadex uses errno instead of GetLastError()
let capturedError = _subprocess_windows_get_errno()
}
} catch let underlyingError {
let error = SubprocessError(
code: .init(.asyncIOFailed("_beginthreadex failed")),
underlyingError: .init(rawValue: capturedError)
code: .init(.asyncIOFailed("Failed to create monitor thread")),
underlyingError: underlyingError
)
self.monitorThread = .failure(error)
return
Expand Down
63 changes: 44 additions & 19 deletions Sources/Subprocess/Platforms/Subprocess+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,24 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible

// MARK: - Spawn
extension Configuration {
// @unchecked Sendable because we need to capture UnsafePointers
// to send to another thread. While UnsafePointers are not
// Sendable, we are not mutating them -- we only need these type
// for C interface.
internal struct SpawnContext: @unchecked Sendable {
let fileActions: posix_spawn_file_actions_t?
let spawnAttributes: posix_spawnattr_t?
let argv: [UnsafeMutablePointer<CChar>?]
let env: [UnsafeMutablePointer<CChar>?]
let uidPtr: UnsafeMutablePointer<uid_t>?
let gidPtr: UnsafeMutablePointer<gid_t>?
}

internal func spawn(
withInput inputPipe: consuming CreatedPipe,
outputPipe: consuming CreatedPipe,
errorPipe: consuming CreatedPipe
) throws -> SpawnResult {
) async throws -> SpawnResult {
// Instead of checking if every possible executable path
// is valid, spawn each directly and catch ENOENT
let possiblePaths = self.executable.possibleExecutablePaths(
Expand All @@ -167,7 +180,7 @@ extension Configuration {
var outputPipeBox: CreatedPipe? = consume outputPipe
var errorPipeBox: CreatedPipe? = consume errorPipe

return try self.preSpawn { args throws -> SpawnResult in
return try await self.preSpawn { args throws -> SpawnResult in
let (env, uidPtr, gidPtr, supplementaryGroups) = args
var _inputPipe = inputPipeBox.take()!
var _outputPipe = outputPipeBox.take()!
Expand All @@ -181,8 +194,6 @@ extension Configuration {
let errorWriteFileDescriptor: IODescriptor? = _errorPipe.writeFileDescriptor()

for possibleExecutablePath in possiblePaths {
var pid: pid_t = 0

// Setup Arguments
let argv: [UnsafeMutablePointer<CChar>?] = self.arguments.createArgs(
withExecutablePath: possibleExecutablePath
Expand Down Expand Up @@ -389,21 +400,35 @@ extension Configuration {
}

// Spawn
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
return _subprocess_spawn(
&pid,
exePath,
&fileActions,
&spawnAttributes,
argv,
env,
uidPtr,
gidPtr,
Int32(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
let spawnContext = SpawnContext(
fileActions: fileActions,
spawnAttributes: spawnAttributes,
argv: argv,
env: env,
uidPtr: uidPtr,
gidPtr: gidPtr
)
let (spawnError, pid) = try await runOnBackgroundThread {
return possibleExecutablePath.withCString { exePath in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
var pid: pid_t = 0
var _fileActions = spawnContext.fileActions
var _spawnAttributes = spawnContext.spawnAttributes
let rc = _subprocess_spawn(
&pid,
exePath,
&_fileActions,
&_spawnAttributes,
spawnContext.argv,
spawnContext.env,
spawnContext.uidPtr,
spawnContext.gidPtr,
Int32(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
return (rc, pid)
}
}
}
// Spawn error
Expand Down
117 changes: 46 additions & 71 deletions Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ extension Configuration {
)

internal func preSpawn<Result: ~Copyable>(
_ work: (PreSpawnArgs) throws -> Result
) throws -> Result {
_ work: (PreSpawnArgs) async throws -> Result
) async throws -> Result {
// Prepare environment
let env = self.environment.createEnv()
defer {
Expand All @@ -378,7 +378,7 @@ extension Configuration {
if let groupsValue = self.platformOptions.supplementaryGroups {
supplementaryGroups = groupsValue
}
return try work(
return try await work(
(
env: env,
uidPtr: uidPtr,
Expand Down Expand Up @@ -415,11 +415,24 @@ internal typealias PlatformFileDescriptor = CInt

#if !canImport(Darwin)
extension Configuration {

// @unchecked Sendable because we need to capture UnsafePointers
// to send to another thread. While UnsafePointers are not
// Sendable, we are not mutating them -- we only need these type
// for C interface.
internal struct SpawnContext: @unchecked Sendable {
let argv: [UnsafeMutablePointer<CChar>?]
let env: [UnsafeMutablePointer<CChar>?]
let uidPtr: UnsafeMutablePointer<uid_t>?
let gidPtr: UnsafeMutablePointer<gid_t>?
let processGroupIDPtr: UnsafeMutablePointer<gid_t>?
}

internal func spawn(
withInput inputPipe: consuming CreatedPipe,
outputPipe: consuming CreatedPipe,
errorPipe: consuming CreatedPipe
) throws -> SpawnResult {
) async throws -> SpawnResult {
// Ensure the waiter thread is running.
#if os(Linux) || os(Android)
_setupMonitorSignalHandler()
Expand All @@ -434,7 +447,7 @@ extension Configuration {
var outputPipeBox: CreatedPipe? = consume outputPipe
var errorPipeBox: CreatedPipe? = consume errorPipe

return try self.preSpawn { args throws -> SpawnResult in
return try await self.preSpawn { args throws -> SpawnResult in
let (env, uidPtr, gidPtr, supplementaryGroups) = args

var _inputPipe = inputPipeBox.take()!
Expand Down Expand Up @@ -472,27 +485,34 @@ extension Configuration {
]

// Spawn
var pid: pid_t = 0
var processDescriptor: PlatformFileDescriptor = -1
let spawnError: CInt = possibleExecutablePath.withCString { exePath in
return (self.workingDirectory?.string).withOptionalCString { workingDir in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
return fileDescriptors.withUnsafeBufferPointer { fds in
return _subprocess_fork_exec(
&pid,
&processDescriptor,
exePath,
workingDir,
fds.baseAddress!,
argv,
env,
uidPtr,
gidPtr,
processGroupIDPtr,
CInt(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
let spawnContext = SpawnContext(
argv: argv, env: env, uidPtr: uidPtr, gidPtr: gidPtr, processGroupIDPtr: processGroupIDPtr
)
let (pid, processDescriptor, spawnError) = try await runOnBackgroundThread {
return possibleExecutablePath.withCString { exePath in
return (self.workingDirectory?.string).withOptionalCString { workingDir in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
return fileDescriptors.withUnsafeBufferPointer { fds in
var pid: pid_t = 0
var processDescriptor: PlatformFileDescriptor = -1

let rc = _subprocess_fork_exec(
&pid,
&processDescriptor,
exePath,
workingDir,
fds.baseAddress!,
spawnContext.argv,
spawnContext.env,
spawnContext.uidPtr,
spawnContext.gidPtr,
spawnContext.processGroupIDPtr,
CInt(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
return (pid, processDescriptor, rc)
}
}
}
}
Expand Down Expand Up @@ -654,51 +674,6 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
return self.description(withIndent: 0)
}
}

// Special keys used in Error's user dictionary
extension String {
static let debugDescriptionErrorKey = "DebugDescription"
}

internal func pthread_create(_ body: @Sendable @escaping () -> ()) throws(SubprocessError.UnderlyingError) -> pthread_t {
final class Context {
let body: @Sendable () -> ()
init(body: @Sendable @escaping () -> Void) {
self.body = body
}
}
#if canImport(Glibc) || canImport(Musl)
func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? {
(Unmanaged<AnyObject>.fromOpaque(context!).takeRetainedValue() as! Context).body()
return nil
}
#elseif canImport(Bionic)
func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
(Unmanaged<AnyObject>.fromOpaque(context).takeRetainedValue() as! Context).body()
return context
}
#endif
#if (os(Linux) && canImport(Glibc)) || canImport(Bionic)
var thread = pthread_t()
#else
var thread: pthread_t?
#endif
let rc = pthread_create(
&thread,
nil,
proc,
Unmanaged.passRetained(Context(body: body)).toOpaque()
)
if rc != 0 {
throw SubprocessError.UnderlyingError(rawValue: rc)
}
#if (os(Linux) && canImport(Glibc)) || canImport(Bionic)
return thread
#else
return thread!
#endif
}

#endif // !canImport(Darwin)

extension ProcessIdentifier {
Expand Down
Loading
Loading