diff --git a/Sources/Subprocess/CMakeLists.txt b/Sources/Subprocess/CMakeLists.txt index 53de37c..b9c3409 100644 --- a/Sources/Subprocess/CMakeLists.txt +++ b/Sources/Subprocess/CMakeLists.txt @@ -14,6 +14,7 @@ add_library(Subprocess Buffer.swift Error.swift Teardown.swift + Thread.swift Result.swift IO/Output.swift IO/Input.swift diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index ca8c6a1..4e67429 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -84,7 +84,7 @@ public struct Configuration: Sendable { isolation: isolated (any Actor)? = #isolation, _ body: ((Execution, consuming IOChannel?, consuming IOChannel?, consuming IOChannel?) async throws -> Result) ) async throws -> ExecutionResult { - let spawnResults = try self.spawn( + let spawnResults = try await self.spawn( withInput: input, outputPipe: output, errorPipe: error @@ -660,7 +660,11 @@ internal struct IODescriptor: ~Copyable { #endif internal var closeWhenDone: Bool + #if canImport(WinSDK) + internal nonisolated(unsafe) let descriptor: Descriptor + #else internal let descriptor: Descriptor + #endif internal init( _ descriptor: Descriptor, diff --git a/Sources/Subprocess/IO/AsyncIO+Windows.swift b/Sources/Subprocess/IO/AsyncIO+Windows.swift index 4e794ee..ad02d94 100644 --- a/Sources/Subprocess/IO/AsyncIO+Windows.swift +++ b/Sources/Subprocess/IO/AsyncIO+Windows.swift @@ -43,7 +43,7 @@ final class AsyncIO: @unchecked Sendable { ) rethrows -> ResultType } - private final class MonitorThreadContext { + private struct MonitorThreadContext: @unchecked Sendable { let ioCompletionPort: HANDLE init(ioCompletionPort: HANDLE) { @@ -61,9 +61,9 @@ final class AsyncIO: @unchecked Sendable { var maybeSetupError: SubprocessError? = nil // Create the the completion port guard - let port = CreateIoCompletionPort( + let ioCompletionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, nil, 0, 0 - ), port != INVALID_HANDLE_VALUE + ), ioCompletionPort != INVALID_HANDLE_VALUE else { let error = SubprocessError( code: .init(.asyncIOFailed("CreateIoCompletionPort failed")), @@ -73,17 +73,12 @@ final class AsyncIO: @unchecked Sendable { self.monitorThread = .failure(error) return } - self.ioCompletionPort = .success(port) + self.ioCompletionPort = .success(ioCompletionPort) // Create monitor thread - let threadContext = MonitorThreadContext(ioCompletionPort: port) - let threadContextPtr = Unmanaged.passRetained(threadContext) - /// Microsoft documentation for `CreateThread` states: - /// > A thread in an executable that calls the C run-time library (CRT) - /// > should use the _beginthreadex and _endthreadex functions for - /// > thread management rather than CreateThread and ExitThread - let threadHandleValue = _beginthreadex( - nil, 0, - { args in + let context = MonitorThreadContext(ioCompletionPort: ioCompletionPort) + let threadHandle: HANDLE + do { + threadHandle = try begin_thread_x { func reportError(_ error: SubprocessError) { let continuations = _registration.withLock { store in return store.values @@ -93,9 +88,6 @@ final class AsyncIO: @unchecked Sendable { } } - let unmanaged = Unmanaged.fromOpaque(args!) - let context = unmanaged.takeRetainedValue() - // Monitor loop while true { var bytesTransferred: DWORD = 0 @@ -149,16 +141,13 @@ final class AsyncIO: @unchecked Sendable { } continuation?.yield(bytesTransferred) } + return 0 - }, threadContextPtr.toOpaque(), 0, nil) - guard threadHandleValue > 0, - let threadHandle = HANDLE(bitPattern: threadHandleValue) - else { - // _beginthreadex uses errno instead of GetLastError() - let capturedError = _subprocess_windows_get_errno() + } + } catch let underlyingError { let error = SubprocessError( - code: .init(.asyncIOFailed("_beginthreadex failed")), - underlyingError: .init(rawValue: capturedError) + code: .init(.asyncIOFailed("Failed to create monitor thread")), + underlyingError: underlyingError ) self.monitorThread = .failure(error) return diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index c1aa76c..4d74c39 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -153,11 +153,24 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible // MARK: - Spawn extension Configuration { + // @unchecked Sendable because we need to capture UnsafePointers + // to send to another thread. While UnsafePointers are not + // Sendable, we are not mutating them -- we only need these type + // for C interface. + internal struct SpawnContext: @unchecked Sendable { + let fileActions: posix_spawn_file_actions_t? + let spawnAttributes: posix_spawnattr_t? + let argv: [UnsafeMutablePointer?] + let env: [UnsafeMutablePointer?] + let uidPtr: UnsafeMutablePointer? + let gidPtr: UnsafeMutablePointer? + } + internal func spawn( withInput inputPipe: consuming CreatedPipe, outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe - ) throws -> SpawnResult { + ) async throws -> SpawnResult { // Instead of checking if every possible executable path // is valid, spawn each directly and catch ENOENT let possiblePaths = self.executable.possibleExecutablePaths( @@ -167,7 +180,7 @@ extension Configuration { var outputPipeBox: CreatedPipe? = consume outputPipe var errorPipeBox: CreatedPipe? = consume errorPipe - return try self.preSpawn { args throws -> SpawnResult in + return try await self.preSpawn { args throws -> SpawnResult in let (env, uidPtr, gidPtr, supplementaryGroups) = args var _inputPipe = inputPipeBox.take()! var _outputPipe = outputPipeBox.take()! @@ -181,8 +194,6 @@ extension Configuration { let errorWriteFileDescriptor: IODescriptor? = _errorPipe.writeFileDescriptor() for possibleExecutablePath in possiblePaths { - var pid: pid_t = 0 - // Setup Arguments let argv: [UnsafeMutablePointer?] = self.arguments.createArgs( withExecutablePath: possibleExecutablePath @@ -389,21 +400,35 @@ extension Configuration { } // Spawn - let spawnError: CInt = possibleExecutablePath.withCString { exePath in - return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in - return _subprocess_spawn( - &pid, - exePath, - &fileActions, - &spawnAttributes, - argv, - env, - uidPtr, - gidPtr, - Int32(supplementaryGroups?.count ?? 0), - sgroups?.baseAddress, - self.platformOptions.createSession ? 1 : 0 - ) + let spawnContext = SpawnContext( + fileActions: fileActions, + spawnAttributes: spawnAttributes, + argv: argv, + env: env, + uidPtr: uidPtr, + gidPtr: gidPtr + ) + let (spawnError, pid) = try await runOnBackgroundThread { + return possibleExecutablePath.withCString { exePath in + return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in + var pid: pid_t = 0 + var _fileActions = spawnContext.fileActions + var _spawnAttributes = spawnContext.spawnAttributes + let rc = _subprocess_spawn( + &pid, + exePath, + &_fileActions, + &_spawnAttributes, + spawnContext.argv, + spawnContext.env, + spawnContext.uidPtr, + spawnContext.gidPtr, + Int32(supplementaryGroups?.count ?? 0), + sgroups?.baseAddress, + self.platformOptions.createSession ? 1 : 0 + ) + return (rc, pid) + } } } // Spawn error diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index aee34f8..bd82873 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -225,9 +225,9 @@ private func monitorThreadFunc(context: MonitorThreadContext) { repeating: epoll_event(events: 0, data: epoll_data(fd: 0)), count: 256 ) - var waitMask = sigset_t(); - sigemptyset(&waitMask); - sigaddset(&waitMask, SIGCHLD); + var waitMask = sigset_t() + sigemptyset(&waitMask) + sigaddset(&waitMask, SIGCHLD) // Enter the monitor loop monitorLoop: while true { let eventCount = epoll_pwait( diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 17d5b32..565ad78 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -350,8 +350,8 @@ extension Configuration { ) internal func preSpawn( - _ work: (PreSpawnArgs) throws -> Result - ) throws -> Result { + _ work: (PreSpawnArgs) async throws -> Result + ) async throws -> Result { // Prepare environment let env = self.environment.createEnv() defer { @@ -378,7 +378,7 @@ extension Configuration { if let groupsValue = self.platformOptions.supplementaryGroups { supplementaryGroups = groupsValue } - return try work( + return try await work( ( env: env, uidPtr: uidPtr, @@ -415,11 +415,24 @@ internal typealias PlatformFileDescriptor = CInt #if !canImport(Darwin) extension Configuration { + + // @unchecked Sendable because we need to capture UnsafePointers + // to send to another thread. While UnsafePointers are not + // Sendable, we are not mutating them -- we only need these type + // for C interface. + internal struct SpawnContext: @unchecked Sendable { + let argv: [UnsafeMutablePointer?] + let env: [UnsafeMutablePointer?] + let uidPtr: UnsafeMutablePointer? + let gidPtr: UnsafeMutablePointer? + let processGroupIDPtr: UnsafeMutablePointer? + } + internal func spawn( withInput inputPipe: consuming CreatedPipe, outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe - ) throws -> SpawnResult { + ) async throws -> SpawnResult { // Ensure the waiter thread is running. #if os(Linux) || os(Android) _setupMonitorSignalHandler() @@ -434,7 +447,7 @@ extension Configuration { var outputPipeBox: CreatedPipe? = consume outputPipe var errorPipeBox: CreatedPipe? = consume errorPipe - return try self.preSpawn { args throws -> SpawnResult in + return try await self.preSpawn { args throws -> SpawnResult in let (env, uidPtr, gidPtr, supplementaryGroups) = args var _inputPipe = inputPipeBox.take()! @@ -472,27 +485,34 @@ extension Configuration { ] // Spawn - var pid: pid_t = 0 - var processDescriptor: PlatformFileDescriptor = -1 - let spawnError: CInt = possibleExecutablePath.withCString { exePath in - return (self.workingDirectory?.string).withOptionalCString { workingDir in - return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in - return fileDescriptors.withUnsafeBufferPointer { fds in - return _subprocess_fork_exec( - &pid, - &processDescriptor, - exePath, - workingDir, - fds.baseAddress!, - argv, - env, - uidPtr, - gidPtr, - processGroupIDPtr, - CInt(supplementaryGroups?.count ?? 0), - sgroups?.baseAddress, - self.platformOptions.createSession ? 1 : 0 - ) + let spawnContext = SpawnContext( + argv: argv, env: env, uidPtr: uidPtr, gidPtr: gidPtr, processGroupIDPtr: processGroupIDPtr + ) + let (pid, processDescriptor, spawnError) = try await runOnBackgroundThread { + return possibleExecutablePath.withCString { exePath in + return (self.workingDirectory?.string).withOptionalCString { workingDir in + return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in + return fileDescriptors.withUnsafeBufferPointer { fds in + var pid: pid_t = 0 + var processDescriptor: PlatformFileDescriptor = -1 + + let rc = _subprocess_fork_exec( + &pid, + &processDescriptor, + exePath, + workingDir, + fds.baseAddress!, + spawnContext.argv, + spawnContext.env, + spawnContext.uidPtr, + spawnContext.gidPtr, + spawnContext.processGroupIDPtr, + CInt(supplementaryGroups?.count ?? 0), + sgroups?.baseAddress, + self.platformOptions.createSession ? 1 : 0 + ) + return (pid, processDescriptor, rc) + } } } } @@ -654,51 +674,6 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible return self.description(withIndent: 0) } } - -// Special keys used in Error's user dictionary -extension String { - static let debugDescriptionErrorKey = "DebugDescription" -} - -internal func pthread_create(_ body: @Sendable @escaping () -> ()) throws(SubprocessError.UnderlyingError) -> pthread_t { - final class Context { - let body: @Sendable () -> () - init(body: @Sendable @escaping () -> Void) { - self.body = body - } - } - #if canImport(Glibc) || canImport(Musl) - func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? { - (Unmanaged.fromOpaque(context!).takeRetainedValue() as! Context).body() - return nil - } - #elseif canImport(Bionic) - func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { - (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() - return context - } - #endif - #if (os(Linux) && canImport(Glibc)) || canImport(Bionic) - var thread = pthread_t() - #else - var thread: pthread_t? - #endif - let rc = pthread_create( - &thread, - nil, - proc, - Unmanaged.passRetained(Context(body: body)).toOpaque() - ) - if rc != 0 { - throw SubprocessError.UnderlyingError(rawValue: rc) - } - #if (os(Linux) && canImport(Glibc)) || canImport(Bionic) - return thread - #else - return thread! - #endif -} - #endif // !canImport(Darwin) extension ProcessIdentifier { diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index b352698..6c70a12 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -23,21 +23,30 @@ import _SubprocessCShims // Windows specific implementation extension Configuration { + // @unchecked Sendable because we need to capture UnsafePointers + // to send to another thread. While UnsafePointers are not + // Sendable, we are not mutating them -- we only need these type + // for C interface. + internal struct SpawnContext: @unchecked Sendable { + let startupInfo: UnsafeMutablePointer + let createProcessFlags: DWORD + } + internal func spawn( withInput inputPipe: consuming CreatedPipe, outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe - ) throws -> SpawnResult { + ) async throws -> SpawnResult { // Spawn differently depending on whether // we need to spawn as a user guard let userCredentials = self.platformOptions.userCredentials else { - return try self.spawnDirect( + return try await self.spawnDirect( withInput: inputPipe, outputPipe: outputPipe, errorPipe: errorPipe ) } - return try self.spawnAsUser( + return try await self.spawnAsUser( withInput: inputPipe, outputPipe: outputPipe, errorPipe: errorPipe, @@ -49,7 +58,7 @@ extension Configuration { withInput inputPipe: consuming CreatedPipe, outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe - ) throws -> SpawnResult { + ) async throws -> SpawnResult { var inputReadFileDescriptor: IODescriptor? = inputPipe.readFileDescriptor() var inputWriteFileDescriptor: IODescriptor? = inputPipe.writeFileDescriptor() var outputReadFileDescriptor: IODescriptor? = outputPipe.readFileDescriptor() @@ -104,10 +113,9 @@ extension Configuration { throw error } - var processInfo: PROCESS_INFORMATION = PROCESS_INFORMATION() var createProcessFlags = self.generateCreateProcessFlag() - let created = try self.withStartupInfoEx( + let (created, processInfo, windowsError) = try await self.withStartupInfoEx( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -121,26 +129,34 @@ extension Configuration { } // Spawn! - return try applicationName.withOptionalNTPathRepresentation { applicationNameW in - try commandAndArgs.withCString( - encodedAs: UTF16.self - ) { commandAndArgsW in - try environment.withCString( + let spawnContext = SpawnContext( + startupInfo: startupInfo, + createProcessFlags: createProcessFlags + ) + return try await runOnBackgroundThread { + return try applicationName.withOptionalNTPathRepresentation { applicationNameW in + try commandAndArgs.withCString( encodedAs: UTF16.self - ) { environmentW in - try intendedWorkingDir.withOptionalNTPathRepresentation { intendedWorkingDirW in - CreateProcessW( - applicationNameW, - UnsafeMutablePointer(mutating: commandAndArgsW), - nil, // lpProcessAttributes - nil, // lpThreadAttributes - true, // bInheritHandles - createProcessFlags, - UnsafeMutableRawPointer(mutating: environmentW), - intendedWorkingDirW, - startupInfo.pointer(to: \.StartupInfo)!, - &processInfo - ) + ) { commandAndArgsW in + try environment.withCString( + encodedAs: UTF16.self + ) { environmentW in + try intendedWorkingDir.withOptionalNTPathRepresentation { intendedWorkingDirW in + var processInfo = PROCESS_INFORMATION() + let result = CreateProcessW( + applicationNameW, + UnsafeMutablePointer(mutating: commandAndArgsW), + nil, // lpProcessAttributes + nil, // lpThreadAttributes + true, // bInheritHandles + spawnContext.createProcessFlags, + UnsafeMutableRawPointer(mutating: environmentW), + intendedWorkingDirW, + spawnContext.startupInfo.pointer(to: \.StartupInfo)!, + &processInfo + ) + return (result, processInfo, GetLastError()) + } } } } @@ -148,7 +164,6 @@ extension Configuration { } guard created else { - let windowsError = GetLastError() if windowsError == ERROR_FILE_NOT_FOUND || windowsError == ERROR_PATH_NOT_FOUND { // This execution path is not it. Try the next one continue @@ -233,7 +248,7 @@ extension Configuration { outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe, userCredentials: PlatformOptions.UserCredentials - ) throws -> SpawnResult { + ) async throws -> SpawnResult { var inputPipeBox: CreatedPipe? = consume inputPipe var outputPipeBox: CreatedPipe? = consume outputPipe var errorPipeBox: CreatedPipe? = consume errorPipe @@ -297,10 +312,9 @@ extension Configuration { throw error } - var processInfo: PROCESS_INFORMATION = PROCESS_INFORMATION() var createProcessFlags = self.generateCreateProcessFlag() - let created = try self.withStartupInfoEx( + let (created, processInfo, windowsError) = try await self.withStartupInfoEx( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -313,37 +327,45 @@ extension Configuration { try configurator(&createProcessFlags, &startupInfo.pointer(to: \.StartupInfo)!.pointee) } + let spawnContext = SpawnContext( + startupInfo: startupInfo, + createProcessFlags: createProcessFlags + ) // Spawn (featuring pyramid!) - return try userCredentials.username.withCString( - encodedAs: UTF16.self - ) { usernameW in - try userCredentials.password.withCString( + return try await runOnBackgroundThread { + return try userCredentials.username.withCString( encodedAs: UTF16.self - ) { passwordW in - try userCredentials.domain.withOptionalCString( + ) { usernameW in + try userCredentials.password.withCString( encodedAs: UTF16.self - ) { domainW in - try applicationName.withOptionalNTPathRepresentation { applicationNameW in - try commandAndArgs.withCString( - encodedAs: UTF16.self - ) { commandAndArgsW in - try environment.withCString( + ) { passwordW in + try userCredentials.domain.withOptionalCString( + encodedAs: UTF16.self + ) { domainW in + try applicationName.withOptionalNTPathRepresentation { applicationNameW in + try commandAndArgs.withCString( encodedAs: UTF16.self - ) { environmentW in - try intendedWorkingDir.withOptionalNTPathRepresentation { intendedWorkingDirW in - CreateProcessWithLogonW( - usernameW, - domainW, - passwordW, - DWORD(LOGON_WITH_PROFILE), - applicationNameW, - UnsafeMutablePointer(mutating: commandAndArgsW), - createProcessFlags, - UnsafeMutableRawPointer(mutating: environmentW), - intendedWorkingDirW, - startupInfo.pointer(to: \.StartupInfo)!, - &processInfo - ) + ) { commandAndArgsW in + try environment.withCString( + encodedAs: UTF16.self + ) { environmentW in + try intendedWorkingDir.withOptionalNTPathRepresentation { intendedWorkingDirW in + var processInfo = PROCESS_INFORMATION() + let created = CreateProcessWithLogonW( + usernameW, + domainW, + passwordW, + DWORD(LOGON_WITH_PROFILE), + applicationNameW, + UnsafeMutablePointer(mutating: commandAndArgsW), + spawnContext.createProcessFlags, + UnsafeMutableRawPointer(mutating: environmentW), + intendedWorkingDirW, + spawnContext.startupInfo.pointer(to: \.StartupInfo)!, + &processInfo + ) + return (created, processInfo, GetLastError()) + } } } } @@ -354,8 +376,6 @@ extension Configuration { } guard created else { - let windowsError = GetLastError() - if windowsError == ERROR_FILE_NOT_FOUND || windowsError == ERROR_PATH_NOT_FOUND { // This executable path is not it. Try the next one continue @@ -1045,8 +1065,8 @@ extension Configuration { outputWrite outputWriteFileDescriptor: borrowing IODescriptor?, errorRead errorReadFileDescriptor: borrowing IODescriptor?, errorWrite errorWriteFileDescriptor: borrowing IODescriptor?, - _ body: (UnsafeMutablePointer) throws -> Result - ) rethrows -> Result { + _ body: (UnsafeMutablePointer) async throws -> Result + ) async throws -> Result { var info: STARTUPINFOEXW = STARTUPINFOEXW() info.StartupInfo.cb = DWORD(MemoryLayout.size(ofValue: info)) info.StartupInfo.dwFlags |= DWORD(STARTF_USESTDHANDLES) @@ -1112,35 +1132,41 @@ extension Configuration { let alignment = 16 var attributeListByteCount = SIZE_T(0) _ = InitializeProcThreadAttributeList(nil, 1, 0, &attributeListByteCount) - return try withUnsafeTemporaryAllocation(byteCount: Int(attributeListByteCount), alignment: alignment) { attributeListPtr in - let attributeList = LPPROC_THREAD_ATTRIBUTE_LIST(attributeListPtr.baseAddress!) - guard InitializeProcThreadAttributeList(attributeList, 1, 0, &attributeListByteCount) else { - throw SubprocessError( - code: .init(.spawnFailed), - underlyingError: .init(rawValue: GetLastError()) - ) - } - defer { - DeleteProcThreadAttributeList(attributeList) - } + // We can't use withUnsafeTemporaryAllocation here because body is async + let attributeListPtr: UnsafeMutableRawBufferPointer = .allocate( + byteCount: Int(attributeListByteCount), + alignment: alignment + ) + defer { + attributeListPtr.deallocate() + } - var handles = Array(inheritedHandles) - return try handles.withUnsafeMutableBufferPointer { inheritedHandlesPtr in - _ = UpdateProcThreadAttribute( - attributeList, - 0, - _subprocess_PROC_THREAD_ATTRIBUTE_HANDLE_LIST(), - inheritedHandlesPtr.baseAddress!, - SIZE_T(MemoryLayout.stride * inheritedHandlesPtr.count), - nil, - nil - ) + let attributeList = LPPROC_THREAD_ATTRIBUTE_LIST(attributeListPtr.baseAddress!) + guard InitializeProcThreadAttributeList(attributeList, 1, 0, &attributeListByteCount) else { + throw SubprocessError( + code: .init(.spawnFailed), + underlyingError: .init(rawValue: GetLastError()) + ) + } + defer { + DeleteProcThreadAttributeList(attributeList) + } - info.lpAttributeList = attributeList + var handles = Array(inheritedHandles) + handles.withUnsafeMutableBufferPointer { inheritedHandlesPtr in + _ = UpdateProcThreadAttribute( + attributeList, + 0, + _subprocess_PROC_THREAD_ATTRIBUTE_HANDLE_LIST(), + inheritedHandlesPtr.baseAddress!, + SIZE_T(MemoryLayout.stride * inheritedHandlesPtr.count), + nil, + nil + ) - return try body(&info) - } + info.lpAttributeList = attributeList } + return try await body(&info) } private func generateWindowsCommandAndArguments( diff --git a/Sources/Subprocess/Thread.swift b/Sources/Subprocess/Thread.swift new file mode 100644 index 0000000..a4e5c0c --- /dev/null +++ b/Sources/Subprocess/Thread.swift @@ -0,0 +1,345 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if canImport(Darwin) +import Darwin +import os +#elseif canImport(Glibc) +import Glibc +#elseif canImport(Bionic) +import Bionic +#elseif canImport(Musl) +import Musl +#elseif canImport(WinSDK) +import WinSDK +#endif + +internal import Dispatch +import _SubprocessCShims + +#if canImport(Synchronization) +import Synchronization +#endif + +#if canImport(WinSDK) +private typealias MutexType = CRITICAL_SECTION +private typealias ConditionType = CONDITION_VARIABLE +private typealias ThreadType = HANDLE +#else +private typealias MutexType = pthread_mutex_t +private typealias ConditionType = pthread_cond_t +private typealias ThreadType = pthread_t +#endif + +internal func runOnBackgroundThread( + _ body: @Sendable @escaping () throws -> Result +) async throws -> Result { + // Only executed once + _setupWorkerThread + + let result = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let workItem = BackgroundWorkItem(body, continuation: continuation) + _workQueue.enqueue(workItem) + } + return result +} + +private struct BackgroundWorkItem { + private let work: @Sendable () -> Void + + init( + _ body: @Sendable @escaping () throws -> Result, + continuation: CheckedContinuation + ) { + self.work = { + do { + let result = try body() + continuation.resume(returning: result) + } catch { + continuation.resume(throwing: error) + } + } + } + + func run() { + self.work() + } +} + +// We can't use Mutex directly here because we need the underlying `pthread_mutex_t` to be +// exposed so we can use it with `pthread_cond_wait`. +private final class WorkQueue: Sendable { + private nonisolated(unsafe) var queue: [BackgroundWorkItem] + internal nonisolated(unsafe) let mutex: UnsafeMutablePointer + internal nonisolated(unsafe) let waitCondition: UnsafeMutablePointer + + init() { + self.queue = [] + self.mutex = UnsafeMutablePointer.allocate(capacity: 1) + self.waitCondition = UnsafeMutablePointer.allocate(capacity: 1) + #if canImport(WinSDK) + InitializeCriticalSection(self.mutex) + InitializeConditionVariable(self.waitCondition) + #else + pthread_mutex_init(self.mutex, nil) + pthread_cond_init(self.waitCondition, nil) + #endif + } + + func withLock(_ body: (inout [BackgroundWorkItem]) throws -> R) rethrows -> R { + try withUnsafeUnderlyingLock { _, queue in + try body(&queue) + } + } + + private func withUnsafeUnderlyingLock( + _ body: (UnsafeMutablePointer, inout [BackgroundWorkItem]) throws -> R + ) rethrows -> R { + #if canImport(WinSDK) + EnterCriticalSection(self.mutex) + defer { + LeaveCriticalSection(self.mutex) + } + #else + pthread_mutex_lock(self.mutex) + defer { + pthread_mutex_unlock(mutex) + } + #endif + return try body(mutex, &queue) + } + + // Only called in worker thread. Sleeps the thread if there's no more item + func dequeue() -> BackgroundWorkItem? { + return self.withUnsafeUnderlyingLock { mutex, queue in + if queue.isEmpty { + // Sleep the worker thread if there's no more work + #if canImport(WinSDK) + SleepConditionVariableCS(self.waitCondition, mutex, INFINITE) + #else + pthread_cond_wait(self.waitCondition, mutex) + #endif + } + guard !queue.isEmpty else { + return nil + } + return queue.removeFirst() + } + } + + // Only called in parent thread. Signals wait condition to wake up worker thread + func enqueue(_ workItem: BackgroundWorkItem) { + self.withLock { queue in + queue.append(workItem) + #if canImport(WinSDK) + WakeConditionVariable(self.waitCondition) + #else + pthread_cond_signal(self.waitCondition) + #endif + } + } + + func shutdown() { + self.withLock { queue in + queue.removeAll() + #if canImport(WinSDK) + WakeConditionVariable(self.waitCondition) + #else + pthread_cond_signal(self.waitCondition) + #endif + } + } +} + +private let _workQueue = WorkQueue() +private let _workQueueShutdownFlag = AtomicCounter() + +// Okay to be unlocked global mutable because this value is only set once like dispatch_once +private nonisolated(unsafe) var _workerThread: Result = .failure(SubprocessError(code: .init(.spawnFailed), underlyingError: nil)) + +private let _setupWorkerThread: () = { + do { + #if canImport(WinSDK) + let workerThread = try begin_thread_x { + while true { + // This dequeue call will suspend the thread if there's no more work left + guard let workItem = _workQueue.dequeue() else { + break + } + workItem.run() + } + return 0 + } + #else + let workerThread = try pthread_create { + while true { + // This dequeue call will suspend the thread if there's no more work left + guard let workItem = _workQueue.dequeue() else { + break + } + workItem.run() + } + } + #endif + _workerThread = .success(workerThread) + } catch { + _workerThread = .failure(error) + } + + atexit { + _shutdownWorkerThread() + } +}() + +private func _shutdownWorkerThread() { + guard case .success(let thread) = _workerThread else { + return + } + guard _workQueueShutdownFlag.addOne() == 1 else { + // We already shutdown this thread + return + } + _workQueue.shutdown() + #if canImport(WinSDK) + WaitForSingleObject(thread, INFINITE) + CloseHandle(thread) + DeleteCriticalSection(_workQueue.mutex) + // We do not need to destroy CONDITION_VARIABLE + #else + pthread_join(thread, nil) + pthread_mutex_destroy(_workQueue.mutex) + pthread_cond_destroy(_workQueue.waitCondition) + #endif + _workQueue.mutex.deallocate() + _workQueue.waitCondition.deallocate() +} + +// MARK: - AtomicCounter + +#if canImport(Darwin) +// Unfortunately on Darwin we cannot unconditionally use Atomic since it requires macOS 15 +internal struct AtomicCounter: ~Copyable { + private let storage: OSAllocatedUnfairLock + + internal init() { + self.storage = .init(initialState: 0) + } + + internal func addOne() -> UInt8 { + return self.storage.withLock { + $0 += 1 + return $0 + } + } +} +#else +internal struct AtomicCounter: ~Copyable { + + private let storage: Atomic + + internal init() { + self.storage = Atomic(0) + } + + internal func addOne() -> UInt8 { + return self.storage.add(1, ordering: .sequentiallyConsistent).newValue + } +} +#endif + +// MARK: - Thread Creation Primitives +#if canImport(WinSDK) +/// Microsoft documentation for `CreateThread` states: +/// > A thread in an executable that calls the C run-time library (CRT) +/// > should use the _beginthreadex and _endthreadex functions for +/// > thread management rather than CreateThread and ExitThread +internal func begin_thread_x( + _ body: @Sendable @escaping () -> UInt32 +) throws(SubprocessError.UnderlyingError) -> HANDLE { + final class Context { + let body: @Sendable () -> UInt32 + init(body: @Sendable @escaping () -> UInt32) { + self.body = body + } + } + + func proc(_ context: UnsafeMutableRawPointer?) -> UInt32 { + return Unmanaged.fromOpaque(context!).takeRetainedValue().body() + } + + let threadHandleValue = _beginthreadex( + nil, + 0, + proc, + Unmanaged.passRetained(Context(body: body)).toOpaque(), + 0, + nil + ) + guard threadHandleValue != 0, + let threadHandle = HANDLE(bitPattern: threadHandleValue) + else { + // _beginthreadex uses errno instead of GetLastError() + let capturedError = _subprocess_windows_get_errno() + throw SubprocessError.UnderlyingError(rawValue: DWORD(capturedError)) + } + + return threadHandle +} +#else + +internal func pthread_create( + _ body: @Sendable @escaping () -> () +) throws(SubprocessError.UnderlyingError) -> pthread_t { + final class Context { + let body: @Sendable () -> () + init(body: @Sendable @escaping () -> Void) { + self.body = body + } + } + #if canImport(Darwin) + func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? { + (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() + return context + } + #elseif canImport(Glibc) || canImport(Musl) + func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? { + (Unmanaged.fromOpaque(context!).takeRetainedValue() as! Context).body() + return context + } + #elseif canImport(Bionic) + func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { + (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() + return context + } + #endif + + #if canImport(Glibc) || canImport(Bionic) + var thread = pthread_t() + #else + var thread: pthread_t? + #endif + let rc = pthread_create( + &thread, + nil, + proc, + Unmanaged.passRetained(Context(body: body)).toOpaque() + ) + if rc != 0 { + throw SubprocessError.UnderlyingError(rawValue: rc) + } + #if canImport(Glibc) || canImport(Bionic) + return thread + #else + return thread! + #endif +} + +#endif // canImport(WinSDK) diff --git a/Tests/SubprocessTests/AsyncIOTests.swift b/Tests/SubprocessTests/AsyncIOTests.swift index a05f40a..7b725b5 100644 --- a/Tests/SubprocessTests/AsyncIOTests.swift +++ b/Tests/SubprocessTests/AsyncIOTests.swift @@ -135,7 +135,8 @@ extension SubprocessAsyncIOTests { // MARK: - Error Handling Tests extension SubprocessAsyncIOTests { - @Test func testWriteToClosedPipe() async throws { + @Test(.disabled("Cannot safely write to a closed fd without risking it being reused")) + func testWriteToClosedPipe() async throws { var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input) var writeChannel = try _require(pipe.writeFileDescriptor()).createIOChannel() var readChannel = try _require(pipe.readFileDescriptor()).createIOChannel() @@ -171,7 +172,8 @@ extension SubprocessAsyncIOTests { } - @Test func testReadFromClosedPipe() async throws { + @Test(.disabled("Cannot safely read from a closed fd without risking it being reused")) + func testReadFromClosedPipe() async throws { var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input) var writeChannel = try _require(pipe.writeFileDescriptor()).createIOChannel() var readChannel = try _require(pipe.readFileDescriptor()).createIOChannel() diff --git a/Tests/SubprocessTests/ProcessMonitoringTests.swift b/Tests/SubprocessTests/ProcessMonitoringTests.swift index e0a869c..1b743d4 100644 --- a/Tests/SubprocessTests/ProcessMonitoringTests.swift +++ b/Tests/SubprocessTests/ProcessMonitoringTests.swift @@ -103,7 +103,7 @@ struct SubprocessProcessMonitoringTests { config: Configuration, _ body: (Execution) async throws -> Void ) async throws { - let spawnResult = try config.spawn( + let spawnResult = try await config.spawn( withInput: self.devNullInputPipe(), outputPipe: self.devNullOutputPipe(), errorPipe: self.devNullOutputPipe() @@ -307,7 +307,7 @@ extension SubprocessProcessMonitoringTests { for _ in 0..